package com.at.state14;

import com.at.bean.WaterSensor;
import com.at.functions5.WaterSensorMapFunction3;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Map;

/**
 * TODO 统计每种传感器每种水位值出现的次数
 *
 * @author huangchao E-mail:fengquan8866@163.com
 * @version 创建时间：2024/9/30 16:21
 */
public class KeyedMapStateDemo3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 7777)
                .map(new WaterSensorMapFunction3())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            MapState<Integer, Integer> vcCountMapState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("vcCountMapState", Types.INT, Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                                Integer vc = value.getVc();
                                if (vcCountMapState.contains(vc)) {
                                    Integer count = vcCountMapState.get(vc);
                                    vcCountMapState.put(vc, ++count);
                                } else {
                                    vcCountMapState.put(vc, 1);
                                }
                                StringBuilder outStr = new StringBuilder();
                                outStr.append("传感器id为").append(value.getId()).append("\n");
                                for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
                                    outStr.append(vcCount.toString()).append("\n");
                                }
                                outStr.append("--------------------------------------\n");
                                out.collect(outStr.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}
